// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::task::Poll;

use bytes::Bytes;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::op2;
use deno_core::serde::Serialize;
use deno_core::AsyncRefCell;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_net::raw::take_network_stream_resource;
use deno_net::raw::NetworkStream;
use h2;
use h2::Reason;
use h2::RecvStream;
use http;
use http::request::Parts;
use http::HeaderMap;
use http::Response;
use http::StatusCode;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use url::Url;

pub struct Http2Client {
    pub client: AsyncRefCell<h2::client::SendRequest<BufView>>,
    pub url: Url,
}

impl Resource for Http2Client {
    fn name(&self) -> Cow<str> {
        "http2Client".into()
    }
}

#[derive(Debug)]
pub struct Http2ClientConn {
    pub conn: AsyncRefCell<h2::client::Connection<NetworkStream, BufView>>,
    cancel_handle: CancelHandle,
}

impl Resource for Http2ClientConn {
    fn name(&self) -> Cow<str> {
        "http2ClientConnection".into()
    }

    fn close(self: Rc<Self>) {
        self.cancel_handle.cancel()
    }
}

#[derive(Debug)]
pub struct Http2ClientStream {
    pub response: AsyncRefCell<h2::client::ResponseFuture>,
    pub stream: AsyncRefCell<h2::SendStream<BufView>>,
}

impl Resource for Http2ClientStream {
    fn name(&self) -> Cow<str> {
        "http2ClientStream".into()
    }
}

#[derive(Debug)]
pub struct Http2ClientResponseBody {
    pub body: AsyncRefCell<h2::RecvStream>,
    pub trailers_rx: AsyncRefCell<Option<tokio::sync::oneshot::Receiver<Option<HeaderMap>>>>,
    pub trailers_tx: AsyncRefCell<Option<tokio::sync::oneshot::Sender<Option<HeaderMap>>>>,
}

impl Resource for Http2ClientResponseBody {
    fn name(&self) -> Cow<str> {
        "http2ClientResponseBody".into()
    }
}

#[derive(Debug)]
pub struct Http2ServerConnection {
    pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, BufView>>,
}

impl Resource for Http2ServerConnection {
    fn name(&self) -> Cow<str> {
        "http2ServerConnection".into()
    }
}

pub struct Http2ServerSendResponse {
    pub send_response: AsyncRefCell<h2::server::SendResponse<BufView>>,
}

impl Resource for Http2ServerSendResponse {
    fn name(&self) -> Cow<str> {
        "http2ServerSendResponse".into()
    }
}

#[op2(async)]
#[serde]
pub async fn op_http2_connect(
    state: Rc<RefCell<OpState>>,
    #[smi] rid: ResourceId,
    #[string] url: String,
) -> Result<(ResourceId, ResourceId), AnyError> {
    // No permission check necessary because we're using an existing connection
    let network_stream = {
        let mut state = state.borrow_mut();
        take_network_stream_resource(&mut state.resource_table, rid)?
    };

    let url = Url::parse(&url)?;

    let (client, conn) = h2::client::Builder::new().handshake(network_stream).await?;
    let mut state = state.borrow_mut();
    let client_rid = state.resource_table.add(Http2Client {
        client: AsyncRefCell::new(client),
        url,
    });
    let conn_rid = state.resource_table.add(Http2ClientConn {
        conn: AsyncRefCell::new(conn),
        cancel_handle: CancelHandle::new(),
    });
    Ok((client_rid, conn_rid))
}

#[op2(async)]
#[smi]
pub async fn op_http2_listen(
    state: Rc<RefCell<OpState>>,
    #[smi] rid: ResourceId,
) -> Result<ResourceId, AnyError> {
    let stream = take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?;

    let conn = h2::server::Builder::new().handshake(stream).await?;
    Ok(state
        .borrow_mut()
        .resource_table
        .add(Http2ServerConnection {
            conn: AsyncRefCell::new(conn),
        }))
}

#[op2(async)]
#[serde]
pub async fn op_http2_accept(
    state: Rc<RefCell<OpState>>,
    #[smi] rid: ResourceId,
) -> Result<Option<(Vec<(ByteString, ByteString)>, ResourceId, ResourceId)>, AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2ServerConnection>(rid)?;
    let mut conn = RcRef::map(&resource, |r| &r.conn).borrow_mut().await;
    if let Some(res) = conn.accept().await {
        let (req, resp) = res?;
        let (parts, body) = req.into_parts();
        let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
        let stm = state
            .borrow_mut()
            .resource_table
            .add(Http2ClientResponseBody {
                body: AsyncRefCell::new(body),
                trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
                trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
            });

        let Parts {
            uri,
            method,
            headers,
            ..
        } = parts;
        let mut req_headers = Vec::with_capacity(headers.len() + 4);
        req_headers.push((
            ByteString::from(":method"),
            ByteString::from(method.as_str()),
        ));
        req_headers.push((
            ByteString::from(":scheme"),
            ByteString::from(uri.scheme().map(|s| s.as_str()).unwrap_or("http")),
        ));
        req_headers.push((
            ByteString::from(":path"),
            ByteString::from(uri.path_and_query().map(|p| p.as_str()).unwrap_or("")),
        ));
        req_headers.push((
            ByteString::from(":authority"),
            ByteString::from(uri.authority().map(|a| a.as_str()).unwrap_or("")),
        ));
        for (key, val) in headers.iter() {
            req_headers.push((key.as_str().into(), val.as_bytes().into()));
        }

        let resp = state
            .borrow_mut()
            .resource_table
            .add(Http2ServerSendResponse {
                send_response: AsyncRefCell::new(resp),
            });

        Ok(Some((req_headers, stm, resp)))
    } else {
        Ok(None)
    }
}

#[op2(async)]
#[serde]
pub async fn op_http2_send_response(
    state: Rc<RefCell<OpState>>,
    #[smi] rid: ResourceId,
    #[smi] status: u16,
    #[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, u32), AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2ServerSendResponse>(rid)?;
    let mut send_response = RcRef::map(resource, |r| &r.send_response)
        .borrow_mut()
        .await;
    let mut response = Response::new(());
    if let Ok(status) = StatusCode::from_u16(status) {
        *response.status_mut() = status;
    }
    for (name, value) in headers {
        response.headers_mut().append(
            // PATCH(denoland/deno#24780): Mitigates denoland/deno#24678
            HeaderName::from_bytes(&name).unwrap(),
            HeaderValue::from_bytes(&value).unwrap(),
        );
    }

    let stream = send_response.send_response(response, false)?;
    let stream_id = stream.stream_id();

    Ok((rid, stream_id.into()))
}

#[op2(async)]
pub async fn op_http2_poll_client_connection(
    state: Rc<RefCell<OpState>>,
    #[smi] rid: ResourceId,
) -> Result<(), AnyError> {
    let resource = state.borrow().resource_table.get::<Http2ClientConn>(rid)?;

    let cancel_handle = RcRef::map(resource.clone(), |this| &this.cancel_handle);
    let mut conn = RcRef::map(resource, |this| &this.conn).borrow_mut().await;

    match (&mut *conn).or_cancel(cancel_handle).await {
        Ok(result) => result?,
        Err(_) => {
            // TODO(bartlomieju): probably need a better mechanism for closing the connection

            // cancelled
        }
    }

    Ok(())
}

#[op2(async)]
#[serde]
pub async fn op_http2_client_request(
    state: Rc<RefCell<OpState>>,
    #[smi] client_rid: ResourceId,
    // TODO(bartlomieju): maybe use a vector with fixed layout to save sending
    // 4 strings of keys?
    #[serde] mut pseudo_headers: HashMap<String, String>,
    #[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, u32), AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2Client>(client_rid)?;

    let url = resource.url.clone();

    let pseudo_path = pseudo_headers.remove(":path").unwrap_or("/".to_string());
    let pseudo_method = pseudo_headers
        .remove(":method")
        .unwrap_or("GET".to_string());
    // TODO(bartlomieju): handle all pseudo-headers (:authority, :scheme)
    let _pseudo_authority = pseudo_headers
        .remove(":authority")
        .unwrap_or("/".to_string());
    let _pseudo_scheme = pseudo_headers
        .remove(":scheme")
        .unwrap_or("http".to_string());

    let url = url.join(&pseudo_path)?;

    let mut req = http::Request::builder()
        .uri(url.as_str())
        .method(pseudo_method.as_str());

    for (name, value) in headers {
        req.headers_mut().unwrap().append(
            // PATCH(denoland/deno#24780): Mitigates denoland/deno#24678
            HeaderName::from_bytes(&name).unwrap(),
            HeaderValue::from_bytes(&value).unwrap(),
        );
    }

    let request = req.body(()).unwrap();

    let resource = {
        let state = state.borrow();
        state.resource_table.get::<Http2Client>(client_rid)?
    };
    let mut client = RcRef::map(&resource, |r| &r.client).borrow_mut().await;
    poll_fn(|cx| client.poll_ready(cx)).await?;
    let (response, stream) = client.send_request(request, false).unwrap();
    let stream_id = stream.stream_id();
    let stream_rid = state.borrow_mut().resource_table.add(Http2ClientStream {
        response: AsyncRefCell::new(response),
        stream: AsyncRefCell::new(stream),
    });
    Ok((stream_rid, stream_id.into()))
}

#[op2(async)]
pub async fn op_http2_client_send_data(
    state: Rc<RefCell<OpState>>,
    #[smi] stream_rid: ResourceId,
    #[buffer] data: JsBuffer,
    end_of_stream: bool,
) -> Result<(), AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2ClientStream>(stream_rid)?;
    let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;

    stream.send_data(data.to_vec().into(), end_of_stream)?;
    Ok(())
}

#[op2(async)]
pub async fn op_http2_client_reset_stream(
    state: Rc<RefCell<OpState>>,
    #[smi] stream_rid: ResourceId,
    #[smi] code: u32,
) -> Result<(), AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2ClientStream>(stream_rid)?;
    let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
    stream.send_reset(h2::Reason::from(code));
    Ok(())
}

#[op2(async)]
pub async fn op_http2_client_send_trailers(
    state: Rc<RefCell<OpState>>,
    #[smi] stream_rid: ResourceId,
    #[serde] trailers: Vec<(ByteString, ByteString)>,
) -> Result<(), AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2ClientStream>(stream_rid)?;
    let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;

    let mut trailers_map = http::HeaderMap::new();
    for (name, value) in trailers {
        trailers_map.insert(
            HeaderName::from_bytes(&name).unwrap(),
            HeaderValue::from_bytes(&value).unwrap(),
        );
    }

    stream.send_trailers(trailers_map)?;
    Ok(())
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Http2ClientResponse {
    headers: Vec<(ByteString, ByteString)>,
    body_rid: ResourceId,
    status_code: u16,
}

#[op2(async)]
#[serde]
pub async fn op_http2_client_get_response(
    state: Rc<RefCell<OpState>>,
    #[smi] stream_rid: ResourceId,
) -> Result<(Http2ClientResponse, bool), AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2ClientStream>(stream_rid)?;
    let mut response_future = RcRef::map(&resource, |r| &r.response).borrow_mut().await;

    let response = (&mut *response_future).await?;

    let (parts, body) = response.into_parts();
    let status = parts.status;
    let mut res_headers = Vec::new();
    for (key, val) in parts.headers.iter() {
        res_headers.push((key.as_str().into(), val.as_bytes().into()));
    }
    let end_stream = body.is_end_stream();

    let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
    let body_rid = state
        .borrow_mut()
        .resource_table
        .add(Http2ClientResponseBody {
            body: AsyncRefCell::new(body),
            trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
            trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
        });
    Ok((
        Http2ClientResponse {
            headers: res_headers,
            body_rid,
            status_code: status.into(),
        },
        end_stream,
    ))
}

enum DataOrTrailers {
    Data(Bytes),
    Trailers(HeaderMap),
    Eof,
}

fn poll_data_or_trailers(
    cx: &mut std::task::Context,
    body: &mut RecvStream,
) -> Poll<Result<DataOrTrailers, h2::Error>> {
    loop {
        if let Poll::Ready(trailers) = body.poll_trailers(cx) {
            if let Some(trailers) = trailers? {
                return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers)));
            } else {
                return Poll::Ready(Ok(DataOrTrailers::Eof));
            }
        }
        if let Poll::Ready(data) = body.poll_data(cx) {
            if let Some(data) = data {
                return Poll::Ready(Ok(DataOrTrailers::Data(data?)));
            }
            // If data is None, loop one more time to check for trailers
            continue;
        }
        // Return pending here as poll_data will keep the waker
        return Poll::Pending;
    }
}

#[op2(async)]
#[serde]
pub async fn op_http2_client_get_response_body_chunk(
    state: Rc<RefCell<OpState>>,
    #[smi] body_rid: ResourceId,
) -> Result<(Option<Vec<u8>>, bool, bool), AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2ClientResponseBody>(body_rid)?;
    let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;

    loop {
        let result = poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await;
        if let Err(err) = result {
            let reason = err.reason();
            if let Some(reason) = reason {
                if reason == Reason::CANCEL {
                    return Ok((None, false, true));
                }
            }
            return Err(err.into());
        }
        match result.unwrap() {
            DataOrTrailers::Data(data) => {
                return Ok((Some(data.to_vec()), false, false));
            }
            DataOrTrailers::Trailers(trailers) => {
                if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx)
                    .borrow_mut()
                    .await
                    .take()
                {
                    _ = trailers_tx.send(Some(trailers));
                };

                continue;
            }
            DataOrTrailers::Eof => {
                RcRef::map(&resource, |r| &r.trailers_tx)
                    .borrow_mut()
                    .await
                    .take();
                return Ok((None, true, false));
            }
        };
    }
}

#[op2(async)]
#[serde]
pub async fn op_http2_client_get_response_trailers(
    state: Rc<RefCell<OpState>>,
    #[smi] body_rid: ResourceId,
) -> Result<Option<Vec<(ByteString, ByteString)>>, AnyError> {
    let resource = state
        .borrow()
        .resource_table
        .get::<Http2ClientResponseBody>(body_rid)?;
    let trailers = RcRef::map(&resource, |r| &r.trailers_rx)
        .borrow_mut()
        .await
        .take();
    if let Some(trailers) = trailers {
        if let Ok(Some(trailers)) = trailers.await {
            let mut v = Vec::with_capacity(trailers.len());
            for (key, value) in trailers.iter() {
                v.push((
                    ByteString::from(key.as_str()),
                    ByteString::from(value.as_bytes()),
                ));
            }
            Ok(Some(v))
        } else {
            Ok(None)
        }
    } else {
        Ok(None)
    }
}
